[GLUTEN-10992][VL] Fix MatchError for KeyGroupedPartitioning in native shuffle#12335
[GLUTEN-10992][VL] Fix MatchError for KeyGroupedPartitioning in native shuffle#12335brijrajk wants to merge 1 commit into
Conversation
|
Can you please add some tests for KeyGroupedPartitioning |
There was a problem hiding this comment.
Pull request overview
Fixes a Spark 4.0 native-shuffle crash when Spark produces a ShuffleExchangeExec with KeyGroupedPartitioning (e.g., with V2 bucketing shuffle enabled and only one join side reporting partitioning). The change prevents Gluten from creating a native ColumnarShuffleExchangeExec for an unsupported partitioning type and replaces a runtime scala.MatchError with a clearer Gluten exception.
Changes:
- Add an explicit
KeyGroupedPartitioningfallback inVeloxSparkPlanExecApi.genColumnarShuffleExchangeto return vanillaShuffleExchangeExec(tagged for fallback) instead of creatingColumnarShuffleExchangeExec. - Add a defensive default
case other =>inExecUtil.genShuffleDependencyto throwGlutenNotSupportExceptionrather than ascala.MatchErrorfor unknown partitioning types.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| backends-velox/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala | Adds a default match case to fail fast with GlutenNotSupportException instead of MatchError for unsupported/unknown partitioning types. |
| backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala | Adds an explicit KeyGroupedPartitioning fallback path to avoid creating native columnar shuffle for an unsupported partitioning. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| case other => | ||
| throw new GlutenNotSupportException( | ||
| s"Partitioning ${other.getClass.getSimpleName} is not supported by native shuffle") |
…e shuffle When Spark 4.0's V2 bucketing shuffle (spark.sql.v2.bucketing.shuffle.enabled=true) is used in a join where only one side reports partitioning, Spark generates a ShuffleExchangeExec with KeyGroupedPartitioning as its output. The default case in VeloxSparkPlanExecApi.genColumnarShuffleExchange created a ColumnarShuffleExchangeExec for this node, which then crashed with a scala.MatchError in ExecUtil.genShuffleDependency because KeyGroupedPartitioning was not handled in the native partitioning match. Fix by adding an explicit KeyGroupedPartitioning case to genColumnarShuffleExchange that marks the shuffle for fallback to vanilla Spark. Also harden ExecUtil.genShuffleDependency with an explicit wildcard that throws GlutenNotSupportException instead of a cryptic MatchError for any future unknown partitioning types. The exception now embeds the full partitioning toString (expressions, numPartitions) to aid debugging. Add a dedicated GlutenKeyGroupedPartitioningSuite test (spark40 and spark41) that asserts the KeyGroupedPartitioning shuffle falls back to a vanilla ShuffleExchangeExec and is never offloaded to ColumnarShuffleExchangeExec. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
761cfc7 to
37d4051
Compare
|
@liuneng1994 thanks for the review. Both comments are addressed in the latest push:
|
What changes were proposed in this pull request?
When Spark 4.0's V2 bucketing shuffle (
spark.sql.v2.bucketing.shuffle.enabled=true) is used in a join where only one side reports partitioning, Spark generates aShuffleExchangeExecwithKeyGroupedPartitioningas its output partitioning.The default
case _ =>inVeloxSparkPlanExecApi.genColumnarShuffleExchangecreated aColumnarShuffleExchangeExecfor this node without validation. When the query executed,ExecUtil.genShuffleDependencycrashed with ascala.MatchErrorbecauseKeyGroupedPartitioningwas missing from its exhaustive match.Changes:
VeloxSparkPlanExecApi.genColumnarShuffleExchange: add an explicitcase _: KeyGroupedPartitioning =>before the default that adds a fallback tag and returns the vanillaShuffleExchangeExec. This prevents aColumnarShuffleExchangeExecfrom being created for an unsupported partitioning type.ExecUtil.genShuffleDependency: add an explicit wildcardcase other =>that throwsGlutenNotSupportExceptioninstead of the crypticscala.MatchError, as a defensive guard for any future unknown partitioning types.How was this patch tested?
The existing
testGluten("SPARK-41471: shuffle one side: only one side reports partitioning")tests inGlutenKeyGroupedPartitioningSuite(both spark40 and spark41) reproduce the crash exactly — they setV2_BUCKETING_SHUFFLE_ENABLED=truewith only one bucketed side, which triggers aShuffleExchangeExecwithKeyGroupedPartitioningoutput and then callcheckAnswer. After this fix these tests pass withoutMatchError.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (https://claude.ai/code)
Related issue: #10992